package org.codehaus.activemq.store.jdbm;

import java.io.IOException;
import javax.jms.JMSException;
import jdbm.btree.BTree;
import jdbm.helper.Tuple;
import jdbm.helper.TupleBrowser;
import org.codehaus.activemq.AlreadyClosedException;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.SubscriberEntry;
import org.codehaus.activemq.service.Subscription;
import org.codehaus.activemq.store.TopicMessageStore;
import org.codehaus.activemq.util.JMSExceptionHelper;

public class JdbmTopicMessageStore extends JdbmMessageStore
  implements TopicMessageStore
{
  private static final Integer ONE = new Integer(1);
  private BTree ackDatabase;
  private BTree messageCounts;
  private BTree subscriberDetails;

  public JdbmTopicMessageStore(BTree messageTable, BTree orderedIndex, BTree ackDatabase, BTree subscriberDetails, BTree messageCounts)
  {
    super(messageTable, orderedIndex);
    this.ackDatabase = ackDatabase;
    this.subscriberDetails = subscriberDetails;
    this.messageCounts = messageCounts;
  }

  public synchronized void incrementMessageCount(MessageIdentity messageId) throws JMSException {
    try {
      Integer number = (Integer)getMessageCounts().find(messageId);
      if (number == null) {
        number = ONE;
      }
      else {
        number = new Integer(number.intValue() + 1);
      }
      getMessageCounts().insert(messageId, number, true);
    }
    catch (IOException e) {
      throw JMSExceptionHelper.newJMSException("Failed to increment messageCount for  messageID: " + messageId + ". Reason: " + e, e);
    }
  }

  public synchronized void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
    try {
      Integer number = (Integer)getMessageCounts().find(messageIdentity);
      if ((number == null) || (number.intValue() <= 1)) {
        removeMessage(messageIdentity, ack);
        if (number != null)
          getMessageCounts().remove(messageIdentity);
      }
      else
      {
        getMessageCounts().insert(messageIdentity, new Integer(number.intValue() - 1), true);
        number = ONE;
      }
    }
    catch (IOException e) {
      throw JMSExceptionHelper.newJMSException("Failed to increment messageCount for  messageID: " + messageIdentity + ". Reason: " + e, e);
    }
  }

  public synchronized void setLastAcknowledgedMessageIdentity(Subscription subscription, MessageIdentity messageIdentity) throws JMSException {
    String key = subscription.getPersistentKey();
    try {
      getAckDatabase().insert(key, messageIdentity, true);
    }
    catch (IOException e) {
      throw JMSExceptionHelper.newJMSException("Failed to set ack messageID: " + messageIdentity + " for consumerId: " + key + ". Reason: " + e, e);
    }
  }

  public synchronized void recoverSubscription(Subscription subscription, MessageIdentity lastDispatchedMessage) throws JMSException {
    try {
      MessageIdentity lastAcked = getLastAcknowledgedMessageIdentity(subscription);
      if (lastAcked == null)
      {
        setLastAcknowledgedMessageIdentity(subscription, lastDispatchedMessage);
        lastAcked = lastDispatchedMessage;
      }
      Object lastAckedSequenceNumber = lastAcked.getSequenceNumber();

      Tuple tuple = getOrderedIndex().findGreaterOrEqual(lastAckedSequenceNumber);

      TupleBrowser iter = getOrderedIndex().browse();
      while (iter.getNext(tuple)) {
        Long sequenceNumber = (Long)tuple.getKey();
        if (sequenceNumber.compareTo((long)lastAckedSequenceNumber) > 0) {
          ActiveMQMessage message = null;

          message = getMessageBySequenceNumber(sequenceNumber);
          if (message != null)
            subscription.addMessage(getContainer(), message);
        }
      }
    }
    catch (IOException e)
    {
      throw JMSExceptionHelper.newJMSException("Failed to recover subscription: " + subscription + ". Reason: " + e, e);
    }
  }

  public synchronized MessageIdentity getLastestMessageIdentity() throws JMSException {
    return new MessageIdentity(null, new Long(getLastSequenceNumber()));
  }

  public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
    Object key = info.getConsumerKey();
    try {
      return (SubscriberEntry)this.subscriberDetails.find(key);
    } catch (IOException e) {
    	   throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
    	   
    }
  }

  public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException
  {
    Object key = info.getConsumerKey();
    try {
      this.subscriberDetails.insert(key, subscriberEntry, true);
    }
    catch (IOException e) {
      throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
    }
  }

  public synchronized void stop() throws JMSException {
    JMSException firstException = closeTable(this.ackDatabase, null);
    firstException = closeTable(this.messageCounts, firstException);
    this.ackDatabase = null;
    this.messageCounts = null;
    super.stop();
    if (firstException != null)
      throw firstException;
  }

  protected BTree getMessageCounts()
    throws AlreadyClosedException
  {
    if (this.messageCounts == null) {
      throw new AlreadyClosedException("JDBM TopicMessageStore");
    }
    return this.messageCounts;
  }

  protected BTree getAckDatabase() throws AlreadyClosedException {
    if (this.ackDatabase == null) {
      throw new AlreadyClosedException("JDBM TopicMessageStore");
    }
    return this.ackDatabase;
  }

  protected MessageIdentity getLastAcknowledgedMessageIdentity(Subscription subscription) throws IOException, AlreadyClosedException {
    return (MessageIdentity)getAckDatabase().find(subscription.getPersistentKey());
  }
}